-
Notifications
You must be signed in to change notification settings - Fork 343
Add delete file index to pyiceberg and support equality delete reads #2255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
I noticed that this PR addresses the same issue/feature as the one I was working on in here. However, your implementation is more complete (by supporting reading equality deletes and deletion vectors), so I think it makes sense to move forward with this one instead. (cc: @sungwy, since you reviewed my PR) |
oops, sorry @gabeiglio, I was searching for positional deletes in github search and i didnt see that you were already working on it in that PR. Looks like there are some parts of the PR that is still super useful to get merged, like the validates. |
Yea exactly, should have been more clear on my message, my implementation for DeleteFileIndex was a scope creep to achieve the validation. so now that PR can be only for the validation instead of partition maps, delete file index, etc. :) @kevinjqliu |
pyiceberg/io/pyarrow.py
Outdated
@@ -978,18 +979,23 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi | |||
raise ValueError(f"Unsupported file format: {file_format}") | |||
|
|||
|
|||
def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: | |||
def _read_deletes(io: FileIO, data_file: DataFile) -> Union[Dict[str, pa.ChunkedArray], pa.Table]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the output signature and the role of this function is convoluted.
Would it make sense to have two separate functions instead?
pyiceberg/io/pyarrow.py
Outdated
equality_delete_tasks = [] | ||
for task in tasks: | ||
equality_deletes = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES] | ||
if equality_deletes: | ||
for delete_file in equality_deletes: | ||
# create a group of datafile to associated equality delete | ||
equality_delete_tasks.append((task.file.file_path, delete_file)) | ||
|
||
if equality_delete_tasks: | ||
executor = ExecutorFactory.get_or_create() | ||
# Processing equality delete tasks in parallel like position deletes | ||
equality_delete_results = executor.map( | ||
lambda args: (args[0], _read_deletes(io, args[1])), | ||
equality_delete_tasks, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already getting a subset of the files that have equality deletes, so it would make sense to use a different function to read the deletes, than using the convoluted function _read_deletes
pyiceberg/io/pyarrow.py
Outdated
deletes_per_file: Dict[str, List[ChunkedArray]] = {} | ||
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) | ||
if len(unique_deletes) > 0: | ||
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Union[Dict[str, pa.ChunkedArray], pa.Table]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be:
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Union[Dict[str, pa.ChunkedArray], pa.Table]: | |
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[Union[pa.ChunkedArray, pa.Table]]]: |
pyiceberg/io/pyarrow.py
Outdated
@@ -1679,7 +1749,7 @@ def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]: | |||
break | |||
|
|||
def _record_batches_from_scan_tasks_and_deletes( | |||
self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]] | |||
self, tasks: Iterable[FileScanTask], deletes_per_file: Union[Dict[str, pa.ChunkedArray], pa.Table] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self, tasks: Iterable[FileScanTask], deletes_per_file: Union[Dict[str, pa.ChunkedArray], pa.Table] | |
self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[Union[pa.ChunkedArray, pa.Table]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @geruh - thanks for working on this PR, and sorry for the delayed review.
I've added some review feedback. Let me know your thoughts!
@sungwy Thanks a lot! I have done the suggested changes, could you take another look at it? |
Closes #1210
Summary
This work was primarily done by @rutb327 while I provided guidance!
This PR adds equality delete read support to PyIceberg by implementing the delete file indexing system that matches delete files to data files, mimicking the behavior found in Iceberg Core. With this implementation we are able to index files and now read equality deletes during table scans.
Design details
Delete File Index
The new
DeleteFileIndex
class centralizes handling of all delete file types: positional deletes, equality deletes, and deletion vectors. It organizes deletes by type (equality vs. positional), partition (usingPartitionMap
for spec-aware grouping), and path (for path-specific positional deletes). This enables efficient lookup during table scans, reducing unnecessary delete file processing.Equality Delete support
Equality delete files are loaded as PyArrow Tables with their respective equality ids for the schema and for each we are grouping tables with the same set equality id's to reduce anti join operations.
Testing
Added tests from the core iceberg DeleteFileIndex test suite and added some tests with dummy files. As well as some manual testing with a flink setup.
Are there any user-facing changes?
Yes can read tables with equality deletes